KAFKA-19871 - Add partition support to TestRecord#22612
Conversation
Co-authored-by: Marie-Laure Momplot <marie-laure.momplot@michelin.com> Co-authored-by: Julien Brunet <julien.brunet2@michelin.com> Co-authored-by: Adam Souquieres <souquieres.adam@gmail.com>
|
|
||
| /** | ||
| * Creates a record. | ||
| * Partition defaults to {@code -1} (no explicit partition set). |
There was a problem hiding this comment.
Should this comment be also on the above constructor?
| this.value = record.value(); | ||
| this.headers = record.headers(); | ||
| this.recordTime = Instant.ofEpochMilli(record.timestamp()); | ||
| this.partition = record.partition(); |
There was a problem hiding this comment.
Technically, there is no guard to what value ConsumerRecord#partition is set -- of course, if we get it from a KafkaConsumer we can expect it to be correctly set. However, new ConsumerRecord is a public API so users could create one manually. Wondering if we should have a guard to check for < 0, and throw(?) or overwrite to -1 (?) for this case, to avoid weird corner cases?
Or would we want to accept that we either crash in < -1 or treat all negative values as "not set" (ie, same as -1)?
There was a problem hiding this comment.
If we choose to throw an exception, I think we could break some exiting tests.
So silent normalization is acceptable, we can overwrite to -1 if the Partition is < 0
| this.value = record.value(); | ||
| this.headers = record.headers(); | ||
| this.recordTime = Instant.ofEpochMilli(record.timestamp()); | ||
| this.partition = record.partition() != null ? record.partition() : NO_PARTITION; |
There was a problem hiding this comment.
Same question -- if record.partition() != null, it could have any value...
For producer value. For the producer case, treating anything negative as invalid and throw could make even more sense compared to ConsumerRecord because here we have the null option.
Thoughts?
There was a problem hiding this comment.
For ProducerRecord, I think it is a little bit different than ConsumerRecord because a ProducerRecord without a partition is certainly a bug from the caller.
In this case we can throw an exception.
There was a problem hiding this comment.
I just realized that ProducerRecord already has a sentinel and does not allow arbitrary negative values. Therefore, mapping null to NO_PARTITION and preserving non-negative partitions should be sufficient:
So can we just add:
this.partition = record.partition() != null && record.partition() >= 0 ? record.partition() : NO_PARTITION;
| } | ||
|
|
||
| /** | ||
| * Compares this record to {@code o} without considering the {@code partition} field. |
There was a problem hiding this comment.
| * Compares this record to {@code o} without considering the {@code partition} field. | |
| * Compares this record to {@code otherRecord} without considering the {@code partition} field. |
| * assertTrue(expected.equalsIgnorePartition(actual)); | ||
| * }</pre> | ||
| * | ||
| * @param o the record to compare against; {@code null} returns {@code false} |
There was a problem hiding this comment.
| * @param o the record to compare against; {@code null} returns {@code false} | |
| * @param otherRecord the record to compare against; {@code null} returns {@code false} |
| * @param o the record to compare against; {@code null} returns {@code false} | ||
| * @return {@code true} if all fields except {@code partition} are equal | ||
| */ | ||
| public boolean equalsIgnorePartition(final TestRecord<? extends K, ? extends V> o) { |
There was a problem hiding this comment.
| public boolean equalsIgnorePartition(final TestRecord<? extends K, ? extends V> o) { | |
| public boolean equalsIgnorePartition(final TestRecord<? extends K, ? extends V> otherRecord) { |
There was a problem hiding this comment.
Should it be only TestRecord<K, V> -- during KIP review, I was also briefly thinking if we would want to support sub-types, but in the end, if we compare for equality, a TestRecord with sub-key-type or sub-value-type is not equals to "this" record, so we don't need to support this and can just use <K,V> and disallow at compile time already?
| return o != null && (this == o || equalsFields(o)); | ||
| } | ||
|
|
||
| private boolean equalsFields(final TestRecord<?, ?> other) { |
There was a problem hiding this comment.
| private boolean equalsFields(final TestRecord<?, ?> other) { | |
| private boolean equalsFields(final TestRecord<K, V> otherRecord) { |
There was a problem hiding this comment.
I need to keep TestRecord<?, ?> to make the code compile here
Co-authored-by: Marie-Laure Momplot <marie-laure.momplot@michelin.com> Co-authored-by: Julien Brunet <julien.brunet2@michelin.com> Co-authored-by: Adam Souquieres <souquieres.adam@gmail.com>
@mjsax I took your remarks into consideration |
lucasbru
left a comment
There was a problem hiding this comment.
Two nits about validation, otherwise LGTM!
| this.value = value; | ||
| this.recordTime = recordTime; | ||
| this.headers = new RecordHeaders(headers); | ||
| this.partition = partition; |
There was a problem hiding this comment.
Maybe validate against negative values? Can I pass the sentinel here? probably not
There was a problem hiding this comment.
Good point.
I think we should still allow the NO_PARTITION sentinel (-1), since existing constructors use it to represent an unspecified partition and callers may want to construct such records explicitly.
I'll add validation so that only NO_PARTITION and non-negative partition values are accepted.
| this.value = record.value(); | ||
| this.headers = record.headers(); | ||
| this.recordTime = Instant.ofEpochMilli(record.timestamp()); | ||
| this.partition = record.partition() < 0 ? NO_PARTITION : record.partition(); |
There was a problem hiding this comment.
ProducerRecord throws on negative partitions, this one silently fixed. Probably both should throw?
There was a problem hiding this comment.
I just updated TestRecord(ProducerRecord). Should both constructors, for ProducerRecord and ConsumerRecord, throw an exception?
|
Could you add a ticket number or MINOR to the PR title? |
There was a problem hiding this comment.
Pull request overview
This PR extends TestRecord (Streams test-utils) to carry Kafka partition metadata, enabling future multi-partition testing support in TopologyTestDriver (KIP-1238 prerequisite).
Changes:
- Add
partitionfield toTestRecord, new constructors to set it explicitly, and accessors. - Update
equals(),hashCode(), andtoString()to include partition; addequalsIgnorePartition()for partition-agnostic assertions. - Expand unit tests to cover default/explicit partition behavior and partition-aware equality/toString.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| streams/test-utils/src/main/java/org/apache/kafka/streams/test/TestRecord.java | Adds partition state to TestRecord and updates equality/hash/toString plus new comparison helper. |
| streams/test-utils/src/test/java/org/apache/kafka/streams/test/TestRecordTest.java | Updates/extends tests to validate partition defaults, explicit partition, and partition-aware behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public TestRecord(final K key, final V value, final Headers headers, final Instant recordTime, final int partition) { | ||
| this.key = key; | ||
| this.value = value; | ||
| this.recordTime = recordTime; | ||
| this.headers = new RecordHeaders(headers); | ||
| this.partition = partition; | ||
| } |
| * The partition this record is assigned to. | ||
| * A value of {@code -1} is a sentinel meaning "no explicit partition set" and is used | ||
| * only on <em>input</em> records created without an explicit partition argument. | ||
| * Output records read from {@link org.apache.kafka.streams.TestOutputTopic#readRecordsToList()} | ||
| * always carry the real resolved partition ({@code >= 0}). |
| this.key = record.key(); | ||
| this.value = record.value(); | ||
| this.headers = record.headers(); | ||
| this.recordTime = Instant.ofEpochMilli(record.timestamp()); | ||
| this.partition = record.partition() < 0 ? NO_PARTITION : record.partition(); |
| this.headers = record.headers(); | ||
| this.recordTime = Instant.ofEpochMilli(record.timestamp()); | ||
| final Integer partition = record.partition(); | ||
| if (partition != null && partition < 0) { | ||
| throw new IllegalArgumentException( | ||
| "Partition must be >= 0 or null, got: " + partition | ||
| ); | ||
| } | ||
| this.partition = partition != null ? partition : NO_PARTITION; |
| * @param otherRecord the record to compare against; {@code null} returns {@code false} | ||
| * @return {@code true} if all fields except {@code partition} are equal | ||
| */ | ||
| public boolean equalsIgnorePartition(final TestRecord<K, V> otherRecord) { |
| /** | ||
| * Create a {@code TestRecord} from a {@link ConsumerRecord}. | ||
| * The partition is taken from {@link ConsumerRecord#partition()}. | ||
| * | ||
| * @param record The v | ||
| */ |
| public void testConsumerRecord() { | ||
| final String topicName = "topic"; | ||
| final ConsumerRecord<String, Integer> consumerRecord = new ConsumerRecord<>(topicName, 1, 0, recordMs, | ||
| TimestampType.CREATE_TIME, 0, 0, key, value, headers, Optional.empty()); | ||
| final TestRecord<String, Integer> testRecord = new TestRecord<>(consumerRecord); |
Co-authored-by: Marie-Laure Momplot <marie-laure.momplot@michelin.com> Co-authored-by: Julien Brunet <julien.brunet2@michelin.com> Co-authored-by: Adam Souquieres <souquieres.adam@gmail.com>
Co-authored-by: Marie-Laure Momplot <marie-laure.momplot@michelin.com> Co-authored-by: Julien Brunet <julien.brunet2@michelin.com> Co-authored-by: Adam Souquieres <souquieres.adam@gmail.com>
Motivation
As part of KIP-1238: Multi-partition support in
TopologyTestDriver.
TestRecord needs to carry partition information so that records produced
and consumed by TopologyTestDriver can expose the partition they belong
to. This is a prerequisite for supporting multi-partition testing with
TopologyTestDriver.
Changes
This PR updates TestRecord by:
partition to -1 (unspecified);
partition.
Compatibility
Existing TestRecord constructors remain unchanged and continue to assign
partition = -1.
Since no component currently sets a partition value, records created
through existing APIs continue to carry partition = -1, preserving the
behavior of existing tests.
This PR is intentionally limited to introducing partition support in
TestRecord. Follow-up PRs will add multi-partition support to
TopologyTestDriver.
Reviewers: Matthias J. Sax matthias@confluent.io, Lucas Brutschy
lbrutschy@confluent.io